package org.zjvis.dlt.service;

import static org.zjvis.dlt.constant.DltConstant.DEFAULT_DATABASE_NAME;
import static org.zjvis.dlt.utils.DataLineageUtil.distinctByKey;

import com.google.common.collect.Lists;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.zjvis.dlt.data.data_lineage.TableEdgeInfo;
import org.zjvis.dlt.data.data_lineage.TableLineageCanvasInfo;
import org.zjvis.dlt.data.data_lineage.TableNodeInfo;
import org.zjvis.dp.data.lineage.data.TableInfo;
import org.zjvis.dp.data.lineage.data.TableLineageInfo;
import org.zjvis.dp.data.lineage.enums.SQLType;
import org.zjvis.dp.data.lineage.parser.DataLineageParser;

/**
 * @author zhouyu
 * @create 2023-07-11 15:58
 */
@Service
public class TableLineageService {

    private final static Logger logger = LoggerFactory.getLogger("DataLineageService");

    public static final String TABLE_LINEAGE_ERROR = "table lineage parse error";

    @Resource
    private DataLineageParser dataLineageParser;


    public TableLineageCanvasInfo processTableLineageParser(String sqlType, String defaultDatabaseName, String sql) {
        try {
            TableLineageInfo tableLineageInfo = dataLineageParser.processTableLineageParse(sqlType, sql, defaultDatabaseName);

            if (Objects.isNull(tableLineageInfo) || Objects.isNull(tableLineageInfo.getTargetTable())
                    || CollectionUtils.isEmpty(tableLineageInfo.getSourceTables())) {
                return null;
            }
            //设置默认数据库名
            if (StringUtils.isBlank(defaultDatabaseName)) {
                defaultDatabaseName = DEFAULT_DATABASE_NAME;
            }

            TableInfo downTableInfo = tableLineageInfo.getTargetTable();
            String downDatabaseName = StringUtils.isBlank(downTableInfo.getDatabaseName()) ? defaultDatabaseName
                    : downTableInfo.getDatabaseName();
            String downTableName = downTableInfo.getTableName();

            List<TableNodeInfo> tableNodeInfos = Lists.newArrayList();
            List<TableEdgeInfo> tableEdgeInfoList = Lists.newArrayList();

            TableNodeInfo downTableNodeInfo = TableNodeInfo.builder()
                    .databaseName(downDatabaseName)
                    .tableName(downTableName)
                    .build();

            tableNodeInfos.add(downTableNodeInfo);

            for (TableInfo upTableInfo : tableLineageInfo.getSourceTables()) {
                String upDatabaseName = StringUtils.isBlank(upTableInfo.getDatabaseName()) ? defaultDatabaseName :
                        upTableInfo.getDatabaseName();

                TableNodeInfo upTableNodeInfo = TableNodeInfo.builder()
                        .databaseName(upDatabaseName)
                        .tableName(upTableInfo.getTableName())
                        .build();

                //添加节点
                tableNodeInfos.add(upTableNodeInfo);

                //添加边
                tableEdgeInfoList.add(TableEdgeInfo.builder()
                        .source(upTableNodeInfo)
                        .target(downTableNodeInfo)
                        .build());
            }

            //去重，后面也会统一去重
            tableNodeInfos = tableNodeInfos.stream()
                    .filter(distinctByKey(TableNodeInfo::generateIdentity))
                    .collect(Collectors.toList());

            tableEdgeInfoList = tableEdgeInfoList.stream()
                    .filter(distinctByKey(TableEdgeInfo::generateIdentity))
                    .collect(Collectors.toList());

            return TableLineageCanvasInfo.builder()
                    .tableNodeInfoList(tableNodeInfos)
                    .tableEdgeInfoList(tableEdgeInfoList)
                    .build();

        } catch (Exception e) {
            logger.error(TABLE_LINEAGE_ERROR, e);
            return null;
        }
    }

    public TableLineageCanvasInfo mergeTableCanvasInfo(List<TableLineageCanvasInfo> canvasInfos) {
        List<TableNodeInfo> tableNodeInfos = Lists.newArrayList();
        List<TableEdgeInfo> tableEdgeInfos = Lists.newArrayList();

        for(TableLineageCanvasInfo canvasInfo :canvasInfos) {
            tableNodeInfos.addAll(canvasInfo.getTableNodeInfoList());
            tableEdgeInfos.addAll(canvasInfo.getTableEdgeInfoList());
        }
        //去重
        tableNodeInfos = tableNodeInfos.stream()
                .filter(distinctByKey(TableNodeInfo::generateIdentity))
                .collect(Collectors.toList());

        tableEdgeInfos = tableEdgeInfos.stream()
                .filter(distinctByKey(TableEdgeInfo::generateIdentity))
                .collect(Collectors.toList());

        return TableLineageCanvasInfo.builder()
                .tableNodeInfoList(tableNodeInfos)
                .tableEdgeInfoList(tableEdgeInfos)
                .build();
    }
}
